//---------------------------------------------------------- -*- Mode: C++ -*-
// $Id: ClientSM.cc 325 2009-04-24 23:54:17Z sriramsrao $ 
//
// Created 2006/06/05
// Author: Sriram Rao
//
// Copyright 2008 Quantcast Corp.
// Copyright 2006-2008 Kosmix Corp.
//
// This file is part of Kosmos File System (KFS).
//
// Licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// 
// \file ClientSM.h
// \brief Handlers that interface with a KFS client.
//
//----------------------------------------------------------------------------


#include "ClientSM.h"
#include "ChunkServer.h"
#include "util.h"
using namespace KFS;

#include <string>
#include <sstream>
using std::string;
using std::ostringstream;

#include "common/log.h"
#include <boost/scoped_array.hpp>
using boost::scoped_array;

ClientSM::ClientSM(NetConnectionPtr &conn) : mNetConnection(conn), mOp(NULL)
{
	mClientIP = mNetConnection->GetPeerName();
	SET_HANDLER(this, &ClientSM::HandleRequest);
}

ClientSM::~ClientSM()
{
	delete mOp;
	while (!mPending.empty()) {
		mOp = mPending.front();
		mPending.pop_front();
		delete mOp;
	}
}

///
/// Send out the response to the client request.  The response is
/// generated by MetaRequest as per the protocol.
/// @param[in] op The request for which we finished execution.
///
void
ClientSM::SendResponse(MetaRequest *op)
{
	ostringstream os;

	op->response(os);
	mClientIP = mNetConnection->GetPeerName();

	if (op->op == META_ALLOCATE) {
		MetaAllocate *alloc = static_cast<MetaAllocate *>(op);
		ostringstream o;

		o << "alloc: " << alloc->chunkId << ' ' << alloc->chunkVersion << ' ';	 

		for (uint32_t i = 0; i < alloc->servers.size(); i++) {
			o << alloc->servers[i]->ServerID() << ' ';
		}
		KFS_LOG_VA_DEBUG("Client = %s, Allocate: %s", mClientIP.c_str(), o.str().c_str());
	}

	if (op->op != META_LOOKUP) {
		KFS_LOG_VA_INFO("Client = %s, Command %s, Status: %d", 
				mClientIP.c_str(), op->Show().c_str(), op->status);
	}

	mNetConnection->Write(os.str().c_str(), os.str().length());
}

///
/// Generic event handler.  Decode the event that occurred and
/// appropriately extract out the data and deal with the event.
/// @param[in] code: The type of event that occurred
/// @param[in] data: Data being passed in relative to the event that
/// occurred.
/// @retval 0 to indicate successful event handling; -1 otherwise.
///
int
ClientSM::HandleRequest(int code, void *data)
{
	IOBuffer *iobuf;
	MetaRequest *op;
	int cmdLen;

	switch (code) {
	case EVENT_NET_READ:
		// We read something from the network.  Run the RPC that
		// came in.
		iobuf = (IOBuffer *) data;
		if (IsMsgAvail(iobuf, &cmdLen))
			HandleClientCmd(iobuf, cmdLen);
		break;

	case EVENT_NET_WROTE:
		// Something went out on the network.  For now, we don't
		// track it. Later, we may use it for tracking throttling
		// and such.
		break;

	case EVENT_CMD_DONE:
		op = (MetaRequest *) data;
		assert(op == mOp);
		SendResponse(op);
		delete mOp;
		mOp = NULL;
		if (!mPending.empty()) {
			mOp = mPending.front();
			mPending.pop_front();
			SubmitOp();
		}
		break;

	case EVENT_NET_ERROR:
		// KFS_LOG_VA_DEBUG("Closing connection");

		if (mNetConnection)
			mNetConnection->Close();

		SET_HANDLER(this, &ClientSM::HandleTerminate);
		if (mOp == NULL)
			delete this;

		break;

	default:
		assert(!"Unknown event");
		return -1;
	}
	return 0;
}

///
/// Termination handler: if the client has any op queued in the system, wait 
/// for the op to finish.  Then, cleanup this.
/// @param[in] code: The type of event that occurred
/// @param[in] data: Data being passed in relative to the event that
/// occurred.
/// @retval 0 to indicate successful event handling; -1 otherwise.
///
int
ClientSM::HandleTerminate(int code, void *data)
{
	MetaRequest *op;

	switch (code) {
	case EVENT_CMD_DONE:
		op = (MetaRequest *) data;
		assert(op == mOp);
		delete mOp;
		mOp = NULL;
		break;
	default:
		assert(!"Unknown event");
		return -1;
	}

	delete this;
	return 0;
}

///
/// We have a command in a buffer. So, parse out the command and
/// execute it if possible. 
/// @param[in] iobuf: Buffer containing the command
/// @param[in] cmdLen: Length of the command in the buffer
/// 
void
ClientSM::HandleClientCmd(IOBuffer *iobuf, int cmdLen)
{
	scoped_array<char> buf(new char[cmdLen + 1]);
	MetaRequest *op;

	mClientIP = mNetConnection->GetPeerName();

	iobuf->CopyOut(buf.get(), cmdLen);
	buf[cmdLen] = '\0';
    
	if (ParseCommand(buf.get(), cmdLen, &op) != 0) {
		iobuf->Consume(cmdLen);

		KFS_LOG_VA_DEBUG("Aye?: %s", buf.get());
		// got a bogus command
		return;
	}

	// Command is ready to be pushed down.  So remove the cmd from the buffer.
	iobuf->Consume(cmdLen);

	if (mOp != NULL) {
		mPending.push_back(op);
		return;
	}

	mOp = op;

	SubmitOp();
}

void
ClientSM::SubmitOp()
{
	
	KFS_LOG_VA_DEBUG("Got command: %s", mOp->Show().c_str());

	if (mOp->op == META_ALLOCATE) {
		KFS_LOG_VA_INFO("Got allocate: %s", mOp->Show().c_str());
	}
	else if (mOp->op == META_GETLAYOUT) {
		KFS_LOG_VA_INFO("Client = %s, Command %s",
				mClientIP.c_str(), mOp->Show().c_str());
	}

	mOp->clnt = this;
	// send it on its merry way
	submit_request(mOp);
    
}
